From 8d3ebbb7a998463ca9c596374c293e40eeef6bed Mon Sep 17 00:00:00 2001 From: "mjw@wray-m-3.hpl.hp.com" Date: Tue, 6 Jul 2004 12:57:26 +0000 Subject: [PATCH] bitkeeper revision 1.1041.7.1 (40eaa1b63pn-AWBaQWrFszKHGANStQ) Tidy up messaging to device backends. Add message response handlers indexed by message id instead of relying on a queue of deferreds. Tidy up error handling so that deferred errors are caught. --- tools/python/xen/xend/XendDomain.py | 1 + tools/python/xen/xend/XendDomainInfo.py | 17 ++- tools/python/xen/xend/XendMigrate.py | 1 + tools/python/xen/xend/server/SrvBase.py | 1 + tools/python/xen/xend/server/SrvDaemon.py | 1 + tools/python/xen/xend/server/SrvDomainDir.py | 12 +- tools/python/xen/xend/server/blkif.py | 92 +++++++------ tools/python/xen/xend/server/controller.py | 136 +++++++++++++------ tools/python/xen/xend/server/messages.py | 14 +- tools/python/xen/xend/server/netif.py | 46 +++---- 10 files changed, 208 insertions(+), 113 deletions(-) diff --git a/tools/python/xen/xend/XendDomain.py b/tools/python/xen/xend/XendDomain.py index 4bde519b49..8b9db8101c 100644 --- a/tools/python/xen/xend/XendDomain.py +++ b/tools/python/xen/xend/XendDomain.py @@ -8,6 +8,7 @@ import sys import traceback from twisted.internet import defer +defer.Deferred.debug = 1 from twisted.internet import reactor import xen.lowlevel.xc; xc = xen.lowlevel.xc.new() diff --git a/tools/python/xen/xend/XendDomainInfo.py b/tools/python/xen/xend/XendDomainInfo.py index f14e6477c2..619d575466 100644 --- a/tools/python/xen/xend/XendDomainInfo.py +++ b/tools/python/xen/xend/XendDomainInfo.py @@ -15,6 +15,7 @@ import sys import os from twisted.internet import defer +defer.Deferred.debug = 1 import xen.lowlevel.xc; xc = xen.lowlevel.xc.new() import xen.util.ip @@ -430,14 +431,18 @@ class XendDomainInfo: raise VmError('unknown image type: ' + image_name) image_handler(self, image) deferred = self.configure() + def cbok(x): + print 'vm_create> cbok', x + return x + def cberr(err): + self.destroy() + return err + deferred.addCallback(cbok) + deferred.addErrback(cberr) except StandardError, ex: # Catch errors, cleanup and re-raise. self.destroy() raise - def cbok(x): - print 'vm_create> cbok', x - return x - deferred.addCallback(cbok) print 'vm_create<' return deferred @@ -710,6 +715,10 @@ class XendDomainInfo: self.name = d['name'] self.memory = d['memory']/1024 deferred = self.configure() + def cberr(err): + self.destroy() + return err + deferred.addErrback(cberr) except StandardError, ex: self.destroy() raise diff --git a/tools/python/xen/xend/XendMigrate.py b/tools/python/xen/xend/XendMigrate.py index 471ea7d1dd..a598913792 100644 --- a/tools/python/xen/xend/XendMigrate.py +++ b/tools/python/xen/xend/XendMigrate.py @@ -6,6 +6,7 @@ import time from twisted.internet import reactor from twisted.internet import defer +defer.Deferred.debug = 1 from twisted.internet.protocol import Protocol from twisted.internet.protocol import ClientFactory diff --git a/tools/python/xen/xend/server/SrvBase.py b/tools/python/xen/xend/server/SrvBase.py index bcff1bc3a0..059220c408 100644 --- a/tools/python/xen/xend/server/SrvBase.py +++ b/tools/python/xen/xend/server/SrvBase.py @@ -8,6 +8,7 @@ import types import StringIO from twisted.internet import defer +defer.Deferred.debug = 1 from twisted.internet import reactor from twisted.web import error from twisted.web import resource diff --git a/tools/python/xen/xend/server/SrvDaemon.py b/tools/python/xen/xend/server/SrvDaemon.py index 63c4dd1bee..a514b89108 100644 --- a/tools/python/xen/xend/server/SrvDaemon.py +++ b/tools/python/xen/xend/server/SrvDaemon.py @@ -23,6 +23,7 @@ from twisted.internet import reactor from twisted.internet import protocol from twisted.internet import abstract from twisted.internet import defer +defer.Deferred.debug = 1 from xen.lowlevel import xu diff --git a/tools/python/xen/xend/server/SrvDomainDir.py b/tools/python/xen/xend/server/SrvDomainDir.py index 03c869051e..eb1521e117 100644 --- a/tools/python/xen/xend/server/SrvDomainDir.py +++ b/tools/python/xen/xend/server/SrvDomainDir.py @@ -63,7 +63,8 @@ class SrvDomainDir(SrvDir): "Invalid configuration") try: deferred = self.xd.domain_create(config) - deferred.addCallback(self._cb_op_create, configstring, req) + deferred.addCallback(self._op_create_cb, configstring, req) + deferred.addErrback(self._op_create_err, req) return deferred except Exception, ex: print 'op_create> Exception creating domain:' @@ -75,7 +76,7 @@ class SrvDomainDir(SrvDir): # str(ex)) - def _cb_op_create(self, dominfo, configstring, req): + def _op_create_cb(self, dominfo, configstring, req): """Callback to handle deferred domain creation. """ dom = dominfo.id @@ -95,6 +96,13 @@ class SrvDomainDir(SrvDir): out.close() return val + def _op_create_err(self, err, req): + """Callback to handle errors in deferred domain creation. + """ + print 'op_create> Deferred Exception creating domain:', err + req.setResponseCode(http.BAD_REQUEST, "Error creating domain") + return str(err) + def op_restore(self, op, req): """Restore a domain from file. """ diff --git a/tools/python/xen/xend/server/blkif.py b/tools/python/xen/xend/server/blkif.py index d265ae382e..5596ddb842 100755 --- a/tools/python/xen/xend/server/blkif.py +++ b/tools/python/xen/xend/server/blkif.py @@ -1,6 +1,7 @@ # Copyright (C) 2004 Mike Wray from twisted.internet import defer +defer.Deferred.debug = 1 from xen.xend import sxp from xen.xend import PrettyPrint @@ -20,10 +21,6 @@ class BlkifControllerFactory(controller.ControllerFactory): self.majorTypes = [ CMSG_BLKIF_BE ] self.subTypes = { - CMSG_BLKIF_BE_CREATE : self.recv_be_create, - CMSG_BLKIF_BE_CONNECT : self.recv_be_connect, - CMSG_BLKIF_BE_VBD_CREATE : self.recv_be_vbd_create, - CMSG_BLKIF_BE_VBD_GROW : self.recv_be_vbd_grow, CMSG_BLKIF_BE_DRIVER_STATUS_CHANGED: self.recv_be_driver_status_changed, } self.attached = 1 @@ -35,17 +32,20 @@ class BlkifControllerFactory(controller.ControllerFactory): dom domain recreate if true it's a recreate (after xend restart) """ - d = self.addDeferred() + d = defer.Deferred() blkif = self.getInstanceByDom(dom) if blkif: - self.callDeferred(blkif) + d.callback(blkif) else: blkif = BlkifController(self, dom) self.addInstance(blkif) if recreate: - self.callDeferred(blkif) + d.callback(blkif) else: - blkif.send_be_create() + d1 = defer.Deferred() + d1.addCallback(self.respond_be_create, d) + d1.addErrback(d.errback) + blkif.send_be_create(response=d1) return d def getDomainDevices(self, dom): @@ -117,14 +117,14 @@ class BlkifControllerFactory(controller.ControllerFactory): for blkif in self.getInstances(): blkif.reattached() - def recv_be_create(self, msg, req): - #print 'recv_be_create>' + def respond_be_create(self, msg, d): + print 'respond_be_create>' val = unpackMsg('blkif_be_create_t', msg) blkif = self.getInstanceByDom(val['domid']) - self.callDeferred(blkif) + d.callback(blkif) - def recv_be_connect(self, msg, req): - #print 'recv_be_create>' + def respond_be_connect(self, msg): + print 'respond_be_connect>', self val = unpackMsg('blkif_be_connect_t', msg) blkif = self.getInstanceByDom(val['domid']) if blkif: @@ -132,25 +132,30 @@ class BlkifControllerFactory(controller.ControllerFactory): else: pass - def recv_be_vbd_create(self, msg, req): - #print 'recv_be_vbd_create>' + def respond_be_vbd_create(self, msg, d): + print 'recv_be_vbd_create>', self val = unpackMsg('blkif_be_vbd_create_t', msg) blkif = self.getInstanceByDom(val['domid']) if blkif: - blkif.send_be_vbd_grow(val['vdevice']) + d1 = defer.Deferred() + d1.addCallback(self.respond_be_vbd_grow, d) + if d: d1.addErrback(d.errback) + blkif.send_be_vbd_grow(val['vdevice'], response=d1) else: pass - def recv_be_vbd_grow(self, msg, req): - #print 'recv_be_vbd_grow>' + def respond_be_vbd_grow(self, msg, d): + print 'recv_be_vbd_grow>', self val = unpackMsg('blkif_be_vbd_grow_t', msg) # Check status? if self.attached: - self.callDeferred(0) + if d: + d.callback(0) else: self.reattachDevice(val['domid'], val['vdevice']) def recv_be_driver_status_changed(self, msg, req): + print 'recv_be_driver_status_changed>', self, req val = unpackMsg('blkif_be_driver_status_changed_t', msg) status = val['status'] if status == BLKIF_DRIVER_STATUS_UP and not self.attached: @@ -234,20 +239,22 @@ class BlkifController(controller.Controller): """ dev = self.addDevice(vdev, mode, segment) if not dev: return -1 + d = defer.Deferred() if recreate: - d = defer.Deferred() d.callback(self) else: - self.send_be_vbd_create(vdev) - d = self.factory.addDeferred() + d1 = defer.Deferred() + d1.addCallback(self.factory.respond_be_vbd_create, d) + d1.addErrback(d.errback) + self.send_be_vbd_create(vdev, response=d1) return d def destroy(self): def cb_destroy(val): self.send_be_destroy() - d = self.factory.addDeferred() + d = defer.Deferred() d.addCallback(cb_destroy) - self.send_be_disconnect() + self.send_be_disconnect(response=d) def destroyDevices(self): for dev in self.getDevices(): @@ -259,7 +266,9 @@ class BlkifController(controller.Controller): self.attached = 0 for dev in self.devices.values(): dev.attached = 0 - self.send_be_vbd_create(vdev) + d1 = defer.Deferred() + d1.addCallback(self.factory.respond_be_vbd_create, None) + self.send_be_vbd_create(vdev, response=d1) def reattachDevice(self, vdev): """Reattach a device, when the back-end control domain has changed. @@ -300,46 +309,47 @@ class BlkifController(controller.Controller): 'blkif_handle' : val['handle'], 'evtchn' : self.evtchn['port1'], 'shmem_frame' : val['shmem_frame'] }) - self.factory.writeRequest(msg) - pass + d = defer.Deferred() + d.addCallback(self.factory.respond_be_connect) + self.factory.writeRequest(msg, response=d) - def send_fe_interface_status_changed(self): + def send_fe_interface_status_changed(self, response=None): msg = packMsg('blkif_fe_interface_status_changed_t', { 'handle' : 0, 'status' : BLKIF_INTERFACE_STATUS_CONNECTED, 'evtchn' : self.evtchn['port2'] }) - self.writeRequest(msg) + self.writeRequest(msg, response=response) - def send_be_create(self): + def send_be_create(self, response=None): msg = packMsg('blkif_be_create_t', { 'domid' : self.dom, 'blkif_handle' : 0 }) - self.factory.writeRequest(msg) + self.factory.writeRequest(msg, response=response) - def send_be_disconnect(self): + def send_be_disconnect(self, response=None): print '>BlkifController>send_be_disconnect>', 'dom=', self.dom msg = packMsg('blkif_be_disconnect_t', { 'domid' : self.dom, 'blkif_handle' : 0 }) - self.factory.writeRequest(msg) + self.factory.writeRequest(msg, response=response) - def send_be_destroy(self): + def send_be_destroy(self, response=None): print '>BlkifController>send_be_destroy>', 'dom=', self.dom msg = packMsg('blkif_be_destroy_t', { 'domid' : self.dom, 'blkif_handle' : 0 }) - self.factory.writeRequest(msg) + self.factory.writeRequest(msg, response=response) - def send_be_vbd_create(self, vdev): + def send_be_vbd_create(self, vdev, response=None): dev = self.devices[vdev] msg = packMsg('blkif_be_vbd_create_t', { 'domid' : self.dom, 'blkif_handle' : 0, 'vdevice' : dev.vdev, 'readonly' : dev.readonly() }) - self.factory.writeRequest(msg) + self.factory.writeRequest(msg, response=response) - def send_be_vbd_grow(self, vdev): + def send_be_vbd_grow(self, vdev, response=None): dev = self.devices[vdev] msg = packMsg('blkif_be_vbd_grow_t', { 'domid' : self.dom, @@ -348,9 +358,9 @@ class BlkifController(controller.Controller): 'extent.device' : dev.device, 'extent.sector_start' : dev.start_sector, 'extent.sector_length' : dev.nr_sectors }) - self.factory.writeRequest(msg) + self.factory.writeRequest(msg, response=response) - def send_be_vbd_destroy(self, vdev): + def send_be_vbd_destroy(self, vdev, response=None): print '>BlkifController>send_be_vbd_destroy>', 'dom=', self.dom, 'vdev=', vdev PrettyPrint.prettyprint(self.sxpr()) dev = self.devices[vdev] @@ -359,5 +369,5 @@ class BlkifController(controller.Controller): 'blkif_handle' : 0, 'vdevice' : dev.vdev }) del self.devices[vdev] - self.factory.writeRequest(msg) + self.factory.writeRequest(msg, response=response) diff --git a/tools/python/xen/xend/server/controller.py b/tools/python/xen/xend/server/controller.py index 6804331ac7..6d0bd9295f 100755 --- a/tools/python/xen/xend/server/controller.py +++ b/tools/python/xen/xend/server/controller.py @@ -1,10 +1,39 @@ # Copyright (C) 2004 Mike Wray from twisted.internet import defer +defer.Deferred.debug = 1 import channel from messages import msgTypeName +DEBUG=0 + +class OutOfOrderError(RuntimeError): + """Error reported when a response arrives out of order. + """ + pass + +class Responder: + """Handler for a response to a message. + """ + + def __init__(self, mid, deferred): + """Create a responder. + + mid message id of response to handle + deferred deferred object holding the callbacks + """ + self.mid = mid + self.deferred = deferred + + def responseReceived(self, msg): + if self.deferred.called: return + self.deferred.callback(msg) + + def error(self, err): + if self.deferred.called: return + self.deferred.errback(err) + class CtrlMsgRcvr: """Abstract class for things that deal with a control interface to a domain. @@ -26,6 +55,12 @@ class CtrlMsgRcvr: self.dom = None self.channel = None self.idx = None + self.responders = [] + # Timeout (in seconds) for deferreds. + self.timeout = 10 + + def setTimeout(self, timeout): + self.timeout = timeout def requestReceived(self, msg, type, subtype): """Dispatch a request to handlers. @@ -34,10 +69,13 @@ class CtrlMsgRcvr: type major message type subtype minor message type """ + msgid = msg.get_header()['id'] + if DEBUG: + print 'requestReceived>', self, msgid, msgTypeName(type, subtype) method = self.subTypes.get(subtype) if method: method(msg, 1) - else: + elif DEBUG: print ('requestReceived> No handler: Message type %s %d:%d' % (msgTypeName(type, subtype), type, subtype)), self @@ -48,13 +86,61 @@ class CtrlMsgRcvr: type major message type subtype minor message type """ + msgid = msg.get_header()['id'] + if DEBUG: + print 'responseReceived>', self, msgid, msgTypeName(type, subtype) + if self.callResponders(msg): + return method = self.subTypes.get(subtype) if method: method(msg, 0) - else: + elif DEBUG: print ('responseReceived> No handler: Message type %s %d:%d' % (msgTypeName(type, subtype), type, subtype)), self + def addResponder(self, mid, deferred): + """Add a responder for a message id. + The deferred is called with callback(msg) when a response + with the given message id arrives. Responses are expected + to arrive in order of message id. When a response arrives, + waiting responders for messages with lower id have errback + called with an OutOfOrder error. + + mid message id of response expected + deferred a Deferred to handle the response + + returns Responder + """ + if self.timeout > 0: + deferred.setTimeout(self.timeout) + resp = Responder(mid, deferred) + self.responders.append(resp) + return resp + + def callResponders(self, msg): + """Call any waiting responders for a response message. + + msg response message + + returns 1 if there was a responder for the message, 0 otherwise + """ + hdr = msg.get_header() + mid = hdr['id'] + handled = 0 + while self.responders: + resp = self.responders[0] + if resp.mid > mid: + break + self.responders.pop() + if resp.mid < mid: + print 'handleResponse> Out of order:', resp.mid, mid + resp.error(OutOfOrderError()) + else: + handled = 1 + resp.responseReceived(msg) + break + return handled + def lostChannel(self): """Called when the channel to the domain is lost. """ @@ -64,7 +150,6 @@ class CtrlMsgRcvr: """Register interest in our major message types with the channel to our domain. """ - #print 'CtrlMsgRcvr>registerChannel>', self self.channel = self.channelFactory.domChannel(self.dom) self.idx = self.channel.getIndex() if self.majorTypes: @@ -74,7 +159,6 @@ class CtrlMsgRcvr: """Deregister interest in our major message types with the channel to our domain. """ - #print 'CtrlMsgRcvr>deregisterChannel>', self if self.channel: self.channel.deregisterDevice(self) del self.channel @@ -86,10 +170,16 @@ class CtrlMsgRcvr: """ return 0 - def writeRequest(self, msg): + def writeRequest(self, msg, response=None): """Write a request to the channel. + + msg message + response Deferred to handle the response (optional) """ if self.channel: + if DEBUG: print 'CtrlMsgRcvr>writeRequest>', self, msg + if response: + self.addResponder(msg.get_header()['id'], response) self.channel.writeRequest(msg) else: print 'CtrlMsgRcvr>writeRequest>', 'no channel!', self @@ -98,6 +188,7 @@ class CtrlMsgRcvr: """Write a response to the channel. """ if self.channel: + if DEBUG: print 'CtrlMsgRcvr>writeResponse>', self, msg self.channel.writeResponse(msg) else: print 'CtrlMsgRcvr>writeResponse>', 'no channel!', self @@ -111,7 +202,6 @@ class ControllerFactory(CtrlMsgRcvr): instances : mapping of index to controller instance dlist : list of deferreds dom : domain - timeout : deferred timeout """ def __init__(self): @@ -119,8 +209,6 @@ class ControllerFactory(CtrlMsgRcvr): self.instances = {} self.dlist = [] self.dom = 0 - # Timeout (in seconds) for deferreds. - self.timeout = 10 def addInstance(self, instance): """Add a controller instance (under its index). @@ -161,38 +249,6 @@ class ControllerFactory(CtrlMsgRcvr): """ self.delInstance(instance) - def addDeferred(self): - """Add a deferred object. - - returns deferred - """ - d = defer.Deferred() - if self.timeout > 0: - # The deferred will error if not called before timeout. - d.setTimeout(self.timeout) - self.dlist.append(d) - return d - - def callDeferred(self, *args): - """Call the top deferred object - - args arguments - """ - if self.dlist: - d = self.dlist.pop(0) - if not d.called: - d.callback(*args) - - def errDeferred(self, *args): - """Signal an error to the top deferred object. - - args arguments - """ - if self.dlist: - d = self.dlist.pop(0) - if not d.called: - d.errback(*args) - class Controller(CtrlMsgRcvr): """Abstract class for a device controller attached to a domain. """ diff --git a/tools/python/xen/xend/server/messages.py b/tools/python/xen/xend/server/messages.py index bd2c1eed0e..edda5f2303 100644 --- a/tools/python/xen/xend/server/messages.py +++ b/tools/python/xen/xend/server/messages.py @@ -179,6 +179,12 @@ msg_formats.update(shutdown_formats) class Msg: pass +_next_msgid = 0 + +def nextid(): + global _next_msgid + return ++_next_msgid + def packMsg(ty, params): """Pack a message. Any 'mac' parameter is passed in as an int[6] array and converted. @@ -188,7 +194,8 @@ def packMsg(ty, params): returns xu message """ - if DEBUG: print '>packMsg', ty, params + msgid = nextid() + if DEBUG: print '>packMsg', msgid, ty, params (major, minor) = msg_formats[ty] args = {} for (k, v) in params.items(): @@ -200,7 +207,6 @@ def packMsg(ty, params): if DEBUG: for (k, v) in args.items(): print 'packMsg>', k, v, type(v) - msgid = 0 msg = xu.message(major, minor, msgid, args) return msg @@ -228,7 +234,9 @@ def unpackMsg(ty, msg): args['mac'] = mac for k in macs: del args[k] - if DEBUG: print ' unknown vif=", vif + print "respond_be_connect> unknown vif=", vif pass def recv_be_driver_status_changed(self, msg, req): @@ -168,9 +166,10 @@ class NetDev(controller.Dev): def cb_destroy(val): self.controller.send_be_destroy(self.vif) self.down() - d = self.controller.factory.addDeferred() + #d = self.controller.factory.addDeferred() + d = defer.Deferred() d.addCallback(cb_destroy) - self.controller.send_be_disconnect(self.vif) + self.controller.send_be_disconnect(self.vif, response=d) class NetifController(controller.Controller): @@ -268,20 +267,19 @@ class NetifController(controller.Controller): @param vmac mac address (string) """ self.addDevice(vif, vmac) + d = defer.Deferred() if recreate: - d = defer.Deferred() d.callback(self) else: - d = self.factory.addDeferred() - self.send_be_create(vif) + self.send_be_create(vif, response=d) return d def reattach_devices(self): """Reattach all devices when the back-end control domain has changed. """ - d = self.factory.addDeferred() + #d = self.factory.addDeferred() self.send_be_create(vif) - self.attach_fe_devices(0) + self.attach_fe_devices() def attach_fe_devices(self): for dev in self.devices.values(): @@ -310,37 +308,39 @@ class NetifController(controller.Controller): 'evtchn' : dev.evtchn['port1'], 'tx_shmem_frame' : val['tx_shmem_frame'], 'rx_shmem_frame' : val['rx_shmem_frame'] }) - self.factory.writeRequest(msg) + d = defer.Deferred() + d.addCallback(self.factory.respond_be_connect) + self.factory.writeRequest(msg, response=d) - def send_interface_connected(self, vif): + def send_interface_connected(self, vif, response=None): dev = self.devices[vif] msg = packMsg('netif_fe_interface_status_changed_t', { 'handle' : dev.vif, 'status' : NETIF_INTERFACE_STATUS_CONNECTED, 'evtchn' : dev.evtchn['port2'], 'mac' : dev.mac }) - self.writeRequest(msg) + self.writeRequest(msg, response=response) - def send_be_create(self, vif): + def send_be_create(self, vif, response=None): dev = self.devices[vif] msg = packMsg('netif_be_create_t', { 'domid' : self.dom, 'netif_handle' : dev.vif, 'mac' : dev.mac }) - self.factory.writeRequest(msg) + self.factory.writeRequest(msg, response=response) - def send_be_disconnect(self, vif): + def send_be_disconnect(self, vif, response=None): dev = self.devices[vif] msg = packMsg('netif_be_disconnect_t', { 'domid' : self.dom, 'netif_handle' : dev.vif }) - self.factory.writeRequest(msg) + self.factory.writeRequest(msg, response=response) - def send_be_destroy(self, vif): + def send_be_destroy(self, vif, response=None): PrettyPrint.prettyprint(self.sxpr()) dev = self.devices[vif] del self.devices[vif] msg = packMsg('netif_be_destroy_t', { 'domid' : self.dom, 'netif_handle' : vif }) - self.factory.writeRequest(msg) + self.factory.writeRequest(msg, response=response) -- 2.30.2